"""
Case Type   : 基础功能
Case Name   : 不存在关联表时创建发布且指定发布表列表无only参数
Create At   : 2022/1/24
Owner       : opentestcase026
Description :
    1.在两个集群创建表
    2.创建发布端
    3.创建订阅
    4.修改表数据
    5.查询是否同步
    6.创建发布端
    7.修改数据,并检查是否同步
    8.创建发布端
    9.创建发布端
Expect      :
    1.成功
    2.成功
    3.成功
    4.成功
    5.tb_pubsub_case006_1更新,tb_pubsub_case006_2未更新
    6.成功
    7.均更新
    8.失败
    9.失败
History     :
    Modified by opentestcase026 2022/5/5:增加等待5.5s,适配最新代码,并删除guc参数还原
    步骤,减少用例执行耗时
    Modified by opentestcase026 2022/6/17:订阅端新增系统表pg_subscription_rel,同步数
    据时需查询该系统表中同步表状态,发布端重新创建后,需要更新订阅端,该表才能更
    新
    Modified by opentestcase004 2022/8/3:优化用例,增加数据库状态查询 & 增加基础数据
    同步功能后订阅端有基础数据的同时再在订阅端修改表数据,主键冲突,关闭基础数据
    同步
    Modified by opentestcase004 2022/8/5:setup添加断言
"""
import unittest
import os
from yat.test import macro
from yat.test import Node
from testcase.utils.Logger import Logger
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Common import Common
from testcase.utils.Constant import Constant

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 != Primary_SH.get_node_num(), '非1+2环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info("-----------this is setup-----------")
        self.log.info(f"-----{os.path.basename(__file__)[:-3]} start-----")
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.tb_name1 = 'tb_pubsub_case006_1'
        self.tb_name2 = 'tb_pubsub_case006_2'
        self.subname = "sub_case006"
        self.pubname = "pub_case006"
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
            f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
            f'-W {self.pri_userdb_sub.db_password}'
        text = '--step:预置条件,修改pg_hba expect:成功'
        self.log.info(text)
        self.log.info('----发布端状态----')
        status = self.commsh_pub.get_db_cluster_status('detail')
        self.log.info(status)
        self.assertEqual(status.count('Normal'), 4, '执行失败' + text)
        self.log.info('----订阅端状态----')
        status = \
            self.commsh_sub.get_db_cluster_status(
                'detail', env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(status)
        self.assertEqual(status.count('Normal'), 4, '执行失败' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        if 'logical' != self.wal_level:
            result = self.commsh_pub.execute_gsguc(
                'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
            self.assertTrue(result, '执行失败:' + text)
            result = self.commsh_pub.restart_db_cluster(True)
            flg = self.constant.START_SUCCESS_MSG in result or \
                  'Degrade' in result
            self.assertTrue(flg, '执行失败:' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)

    def test_pubsub(self):
        text = '--step1:两个集群均创建表 expect:成功--'
        self.log.info(text)
        sql = f"create table {self.tb_name1}(i int primary key, t text);" \
            f"create table {self.tb_name2}(i int primary key, t text);"
        result = self.commsh_pub.execut_db_sql(
            sql, sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertEqual(result.count(self.constant.TABLE_CREATE_SUCCESS),
                         4, '执行失败:' + text)
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertEqual(result.count(self.constant.TABLE_CREATE_SUCCESS),
                         4, '执行失败:' + text)

        text = '--step2:创建发布端 expect:成功--'
        self.log.info(text)        
        sql = f"CREATE PUBLICATION {self.pubname} " \
            f"FOR TABLE {self.tb_name1};"
        result = self.commsh_pub.execut_db_sql(sql, 
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.create_pub_succ_msg, result,
                      '执行失败:' + text)
        self.assertNotIn(self.constant.SQL_WRONG_MSG[1], result,
                         '执行失败:' + text)

        text = '--step3:创建订阅 expect:成功--'
        self.log.info(text)
        result = self.commsh_sub.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn('', result, '执行失败:' + text)
        sql = f"CREATE SUBSCRIPTION {self.subname} CONNECTION " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} " \
            f"user={self.pri_userdb_pub.db_user} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.pri_userdb_pub.ssh_password}' " \
            f"PUBLICATION {self.pubname} with (copy_data=false);"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.create_sub_succ_msg,
                      result, '执行失败:' + text)

        text = '--step4:修改表数据expect:成功--'
        self.log.info(text)
        sql = f"insert into {self.tb_name1} values(1, 'first');" \
            f"select pg_sleep(5.5);" \
            f"insert into {self.tb_name2} values(1, 'first');"
        result = self.commsh_pub.execut_db_sql(sql, 
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertEqual(result.count(self.constant.INSERT_SUCCESS_MSG),
                         2, '执行失败'+text)

        text = '--step5:查询是否同步 ' \
               'expect:tb_pubsub_case006_1更新,tb_pubsub_case006_2未更新'
        self.log.info(text)
        sql = f"select * from {self.tb_name1};" \
            f"select * from {self.tb_name2};"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertEqual(result.count('0 rows'), 1, '执行失败' + text)
        self.assertEqual(result.count('1 row'), 1, '执行失败' + text)
        self.assertIn('1 | first', result.splitlines()[2], '执行失败' + text)

        text = '--step6:创建发布端 expect:成功--'
        self.log.info(text)
        sql = f"drop PUBLICATION {self.pubname};" \
            f"CREATE PUBLICATION {self.pubname} " \
            f"FOR TABLE {self.tb_name1},{self.tb_name2};"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.create_pub_succ_msg, result,
                      '执行失败:' + text)
        self.assertNotIn(self.constant.SQL_WRONG_MSG[1], result,
                         '执行失败:' + text)

        text = '--step7:修改数据,并检查是否同步expect:均更新--'
        self.log.info(text)
        sql = f"delete from {self.tb_name1};" \
            f"insert into {self.tb_name2} values(2, 'first');"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertEqual(result.count(self.constant.INSERT_SUCCESS_MSG),
                         1, '执行失败' + text)
        self.assertEqual(result.count(self.constant.DELETE_SUCCESS_MSG),
                         1, '执行失败' + text)
        sql = f"alter subscription {self.subname} refresh publication;" \
            f"select pg_sleep(1);select * from {self.tb_name1};" \
            f"select * from {self.tb_name2};"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.alter_sub_succ_msg, result, '执行失败' +
                      text)
        self.assertEqual(result.count('0 rows'), 1, '执行失败' + text)
        self.assertEqual(result.count('1 row'), 1, '执行失败' + text)
        self.assertIn('2 | first', result, '执行失败' + text)

        text = '--step8:创建发布端 expect:失败--'
        self.log.info(text)
        sql = f"drop PUBLICATION {self.pubname};" \
            f"CREATE PUBLICATION {self.pubname} FOR TABLE ;"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.SQL_WRONG_MSG[1], result,
                      '执行失败:' + text)

        text = '--step9:创建发布端 expect:失败--'
        self.log.info(text)
        sql = f"drop PUBLICATION {self.pubname};" \
            f"CREATE PUBLICATION {self.pubname} FOR TABLE not_exist;"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.SQL_WRONG_MSG[1], result,
                      '执行失败:' + text)

    def tearDown(self):
        self.log.info('------------this is tearDown-------------')
        text = '--清理环境--'
        self.log.info(text)
        sql = f"DROP PUBLICATION if exists {self.pubname};"
        drop_pub_result = self.commsh_pub.execut_db_sql(
            sql, sql_type=self.user_param_pub)
        self.log.info(drop_pub_result)
        sql = f"DROP SUBSCRIPTION  {self.subname};"
        drop_sub_result = self.commsh_sub.execut_db_sql(
            sql, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_sub_result)
        sql = f"DROP table  {self.tb_name2},{self.tb_name1};"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        result = self.commsh_pub.execut_db_sql(sql, 
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        guc_res1 = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32')
        self.log.info(guc_res1)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        self.assertTrue(guc_res1, '执行失败:' + text)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.drop_sub_succ_msg, drop_sub_result,
                      '执行失败' + text)
        self.log.info(f"-----{os.path.basename(__file__)[:-3]} end-----")
